package cn.kgc.gmall.cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_CDC_SQL {
    public static void main(String[] args) throws Exception {
        // 创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // ctrl + r 快速替换
        tableEnv.executeSql("CREATE TABLE user_binlog (" +
                " name STRING," +
                " age INT" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = 'hadoop102'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = 'root'," +
                " 'database-name' = 'gmall_2022_realtime'," +
                " 'table-name' = 'user'" +
                ")");
        // 查询出表的信息
        tableEnv.executeSql("select * from user_binlog").print();

        env.execute();
    }
}
